{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Data source\n",
    "\n",
    "The common data elements for predictive maintenance problems can be summarized as follows:\n",
    "\n",
    "* Machine features: The features specific to each individual machine, e.g. engine size, make, model, location, installation date.\n",
    "* Telemetry data: The operating condition data collected from sensors, e.g. temperature, vibration, operating speeds, pressures.\n",
    "* Maintenance history: The repair history of a machine, e.g. maintenance activities or component replacements, this can also include error code or runtime message logs.\n",
    "* Failure history: The failure history of a machine or component of interest.\n",
    "\n",
    "It is possible that failure history is contained within maintenance history, either as in the form of special error codes or order dates for spare parts. In those cases, failures can be extracted from the maintenance data. Additionally, different business domains may have a variety of other data sources that influence failure patterns which are not listed here exhaustively. These should be identified by consulting the domain experts when building predictive models.\n",
    "\n",
    "Some examples of above data elements from use cases are:\n",
    "    \n",
    "**Machine conditions and usage:** Flight routes and times, sensor data collected from aircraft engines, sensor readings from ATM transactions, train events data, sensor readings from wind turbines, elevators and connected cars.\n",
    "    \n",
    "**Machine features:** Circuit breaker technical specifications such as voltage levels, geolocation or car features such as make, model, engine size, tire types, production facility etc.\n",
    "\n",
    "**Failure history:** fight delay dates, aircraft component failure dates and types, ATM cash withdrawal transaction failures, train/elevator door failures, brake disk replacement order dates, wind turbine failure dates and circuit breaker command failures.\n",
    "\n",
    "**Maintenance history:** Flight error logs, ATM transaction error logs, train maintenance records including maintenance type, short description etc. and circuit breaker maintenance records.\n",
    "\n",
    "Given the above data sources, the two main data types we observe in predictive maintenance domain are temporal data and static data. Failure history, machine conditions, repair history, usage history are time series indicated by the timestamp of data collection. Machine and operator specific features, are more static, since they usually describe the technical specifications of machines or operator’s properties.\n",
    "\n",
    "\n",
    "# Step 1: Data Ingestion\n",
    "\n",
    "This data aquisiton notebook will download the simulated predicitive maintenance data sets from blob storage. We do some preliminary data cleaning and verification, and store the results as a Spark data frame in cluster for use in the remaining notebook steps of this analysis.\n",
    "\n",
    "**Note:** This notebook will take less than 3 minutes to execute all cells on a Azure Databricks cluster of type Standard_DS13_v2 cluster, This include the time spent handling the _telemetry_ data set, which contains about 8.7 million records."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "import time\n",
    "import matplotlib.pyplot as plt\n",
    "\n",
    "import seaborn as sns\n",
    "import pandas as pd\n",
    "plt.style.use('ggplot')\n",
    "\n",
    "# Time the notebook execution. \n",
    "# This will only make sense if you \"Run all cells\"\n",
    "\n",
    "tic = time.time()\n",
    "\n",
    "parquet_files_names = {'machines':'machines_files.parquet','maint':'maint_files.parquet',\n",
    "         'errors': 'errors_files.parquet','telemetry':'telemetry_files.parquet',\n",
    "        'failures':'failure_files.parquet'}\n",
    "\n",
    "csv_files_names = {'machines':'machines.csv','maint':'maint.csv',\n",
    "         'errors': 'errors.csv','telemetry':'telemetry.csv',\n",
    "        'failures':'failures.csv'}\n",
    "\n",
    "target_dir = \"dbfs:/dataset/\"\n",
    "storage_path = \"wasb://predmaintenance@amlgitsamples.blob.core.windows.net/data/\"\n",
    "\n",
    "#dbutils.fs.rm(target_dir, recurse = True)\n",
    "dbutils.fs.mkdirs(target_dir) \n",
    "dbutils.fs.cp(storage_path, target_dir, recurse=True)\n",
    "display(dbutils.fs.ls(target_dir))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Download simulated data sets\n",
    "\n",
    "The five data files are:\n",
    "\n",
    " * machines.csv\n",
    " * maint.csv\n",
    " * errors.csv\n",
    " * telemetry.csv\n",
    " * failures.csv\n",
    "\n",
    "To get an idea of what is contained in the data, we examine this machine schematic. \n",
    "![Machine schematic](https://raw.githubusercontent.com/Azure/MachineLearningSamples-PredictiveMaintenance/master/images/machine.png)\n",
    "\n",
    "There are 1000 machines of four different models. Each machine contains four components of interest, and four sensors measuring voltage, pressure, vibration and rotation. A controller monitors the system and raises alerts for five different error conditions. Maintenance logs indicate when something is done to the machine which does not include a component replacement. A failure is defined by the replacement of a component. \n",
    "\n",
    "This notebook does some preliminary data cleanup, creates summary graphics for each data set to verify the data downloaded correctly, and stores the resulting data sets in the cluster storage."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Machines data set\n",
    "\n",
    "This simulation tracks a simulated set of 1000 machines over the course of a single year (2015). \n",
    "\n",
    "This data set includes information about each machine: Machine ID, model type and age (years in service)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "machines =  spark.read.format(\"csv\") \\\n",
    "  .option(\"header\", \"true\") \\\n",
    "  .option(\"inferSchema\", \"true\") \\\n",
    "  .load(os.path.join(storage_path,csv_files_names['machines']))\n",
    "display(machines.take(5))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The figure shows how long the collection of machines have been in service. It indicates there are four model types, shown in different colors, and all four models have been in service over the entire 20 years of service. The machine age will be a feature in our analysis, since we expect older machines may have a different set of errors and failures then machines that have not been in service long."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "machines_df = machines.toPandas()\n",
    "plt.figure(figsize=(14, 7))\n",
    "ax = sns.countplot(x=\"age\", hue=\"model\", data=machines_df,\n",
    "                   order = machines_df.age.value_counts().index).set_title(\"Machines age by model\")\n",
    "del machines_df\n",
    "display(ax.figure)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Errors  data set\n",
    "\n",
    "The error log contains non-breaking errors recorded while the machine is still operational. These errors are not considered failures, though they may be predictive of a future failure event. The error datetime field is rounded to the closest hour since the telemetry data (loaded later) is collected on an hourly rate."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "errors =  spark.read.format(\"csv\") \\\n",
    "  .option(\"header\", \"true\") \\\n",
    "  .option(\"inferSchema\", \"true\") \\\n",
    "  .load(os.path.join(storage_path,csv_files_names['errors']))\n",
    "display(errors.take(5))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The figure shows how many errors occured in each of the five error classes over the entire year. We could split this figure over each individual machine, but with 1000 individuals, the figure would not be very informative."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [],
   "source": [
    "fig, ax = plt.subplots(figsize=(14,7)) \n",
    "errors_count = (spark.createDataFrame(errors.groupBy('errorID').count().collect())\n",
    "              .toPandas())\n",
    "sns.barplot(errors_count['errorID'], errors_count['count'], alpha=0.8).set_title('Errors distribution')\n",
    "ax.set_ylabel(\"Count\")\n",
    "ax.set_xlabel(\"Error ID\")\n",
    "display(ax.figure)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Maintenance data set\n",
    "\n",
    "The maintenance log contains both scheduled and unscheduled maintenance records. Scheduled maintenance corresponds with  regular inspection of components, unscheduled maintenance may arise from mechanical failure or other performance degradations. A failure record is generated for component replacement in the case  of either maintenance events. Because maintenance events can also be used to infer component life, the maintenance data has been collected over two years (2014, 2015) instead of only over the year of interest (2015)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [],
   "source": [
    "maint=  spark.read.format(\"csv\") \\\n",
    "  .option(\"header\", \"true\") \\\n",
    "  .option(\"inferSchema\", \"true\") \\\n",
    "  .load(os.path.join(storage_path,csv_files_names['maint']))\n",
    "display(maint.take(5))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The figure shows a histogram of component replacements divided into the four component types over the entire maintenance history. It looks like these four components are replaced at similar rates.\n",
    "\n",
    "There are many ways we might want to look at this data including calculating how long each component type lasts, or the time history of component replacements within each machine. This will take some preprocess of the data, which we are delaying until we do the feature engineering steps in the next example notebook."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [],
   "source": [
    "fig, ax = plt.subplots() \n",
    "components_count = (spark.createDataFrame(maint.groupBy('comp').count().collect())\n",
    "              .toPandas())\n",
    "sns.barplot(components_count['comp'], components_count['count'], alpha=0.8).set_title('Components distribution')\n",
    "ax.set_ylabel(\"Count\")\n",
    "ax.set_xlabel(\"Component ID\")\n",
    "display(ax.figure)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Telemetry data set\n",
    "\n",
    "The telemetry time-series data consists of voltage, rotation, pressure, and vibration sensor measurements collected from each  machines in real time. The data is averaged over an hour and stored in the telemetry logs."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [],
   "source": [
    "telemetry =  spark.read.format(\"csv\") \\\n",
    "  .option(\"header\", \"true\") \\\n",
    "  .option(\"inferSchema\", \"true\") \\\n",
    "  .load(os.path.join(storage_path, csv_files_names['telemetry']))\n",
    "\n",
    "# handle missing values\n",
    "# define groups of features \n",
    "telemetry_cols = telemetry.columns\n",
    "\n",
    "features_datetime = ['datetime']\n",
    "features_categorical = ['machineID']\n",
    "features_numeric = list(set(telemetry_cols) - set(features_datetime) - set(features_categorical))\n",
    "\n",
    "# Replace numeric NA with 0\n",
    "telemetry = telemetry.fillna(0, subset = features_numeric)\n",
    "\n",
    "# Replace categorical NA with 'Unknown'\n",
    "telemetry = telemetry.fillna(\"Unknown\", subset = features_categorical)\n",
    "\n",
    "# Counts...\n",
    "print(telemetry.count())\n",
    "\n",
    "# Examine 10 rows of data.\n",
    "display(telemetry.take(10))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Rather than plot 8.7 million data points, The figure shows one month worth of telemetry sensor data for one machine."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [],
   "source": [
    "plt_data = telemetry.filter(telemetry.machineID == 1).toPandas()\n",
    "\n",
    "# format datetime field which comes in as string\n",
    "plt_data['datetime'] = pd.to_datetime(plt_data['datetime'], format=\"%Y-%m-%d %H:%M:%S\")\n",
    "\n",
    "\n",
    "# Quick plot to show structure\n",
    "plot_df = plt_data.loc[(plt_data['datetime'] >= pd.to_datetime('2015-02-01')) &\n",
    "                       (plt_data['datetime'] <= pd.to_datetime('2015-03-01'))]\n",
    "\n",
    "plt_data = pd.melt(plot_df, id_vars=['datetime', 'machineID'])\n",
    "\n",
    "fig, ax = plt.subplots(figsize=(14,7))\n",
    "plt_data.groupby(['datetime','variable']).mean()['value']\\\n",
    "    .unstack().plot(ax=ax, title = 'Telemetry sensors measurements of one month')\n",
    "ax.legend(loc='center', bbox_to_anchor=(0.5, -0.05),ncol= 4,\n",
    "         fancybox=True, shadow=True)\n",
    "display(fig)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Failures data set\n",
    "\n",
    "Failures correspond to component replacements within the maintenance log. Each record contains the Machine ID, component type, and replacement datetime. These records will be used to create the machine learning labels we will be trying to predict."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [],
   "source": [
    "failures =  spark.read.format(\"csv\") \\\n",
    "  .option(\"header\", \"true\") \\\n",
    "  .option(\"inferSchema\", \"true\") \\\n",
    "  .load(os.path.join(storage_path,csv_files_names['failures']))\n",
    "display(failures.take(5))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "d The following figure shows failure related replacements occured for each of the 4 component types over the entire year. of the failure records obtained from failure log. This log was built originally from component replacements the maintenance log file."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "fig, ax = plt.subplots(figsize=(14,7)) \n",
    "failures_count = (spark.createDataFrame(failures.groupBy('failure').count().collect())\n",
    "              .toPandas())\n",
    "sns.barplot(failures_count['failure'], failures_count['count'], alpha=0.8).set_title('Failures distribution by component')\n",
    "ax.set_ylabel(\"Count\")\n",
    "ax.set_xlabel(\"Component ID\")\n",
    "display(ax.figure)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now we write the spark dataframes to disk as parquet files."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "metadata": {},
   "outputs": [],
   "source": [
    "machines.write.mode('overwrite').parquet(os.path.join(target_dir,parquet_files_names['machines']))\n",
    "errors.write.mode('overwrite').parquet(os.path.join(target_dir,parquet_files_names['errors']))\n",
    "maint.write.mode('overwrite').parquet(os.path.join(target_dir,parquet_files_names['maint']))\n",
    "telemetry.write.mode('overwrite').parquet(os.path.join(target_dir,parquet_files_names['telemetry']))\n",
    "failures.write.mode('overwrite').parquet(os.path.join(target_dir,parquet_files_names['failures']))\n",
    "\n",
    "for key, val in csv_files_names.items():\n",
    "  dbutils.fs.rm(os.path.join(target_dir, csv_files_names[key]))\n",
    "\n",
    "toc = time.time()\n",
    "print(\"Full run took %.2f minutes\" % ((toc - tic)/60))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Conclusion\n",
    "\n",
    "We have now downloaded the required data files in csv format. We have perormed exploratory analysis on diffrenet dimension of the data sets. Then saved them as Spark data frames for use in the remaining analysis steps. The `2_feature_engineering.ipynb` Jupyter notebook will read these spark data frames and generate the modeling features for out predictive maintenance machine learning model."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "amlenv",
   "language": "python",
   "name": "amlenv"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": "3"
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.5.2"
  },
  "name": "1_data_ingestion",
  "notebookId": 4432760086489961
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
